dc129c2d7ff1ab109783ccbbe43e805208126c23,components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java,PodsConsumerTask,run,#,78

Before Change


        
        @Override
        public void run() {
            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) {
                if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
                    getEndpoint().getKubernetesClient().pods()
                            .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace())
                            .watch(new Watcher<Pod>() {

                                @Override
                                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
                                        Pod resource) {
                                    PodEvent pe = new PodEvent(action, resource);
                                    Exchange exchange = getEndpoint().createExchange();
                                    exchange.getIn().setBody(pe.getPod());
                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction());
                                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
                                    try {
                                        processor.process(exchange);
                                    } catch (Exception e) {
                                        getExceptionHandler().handleException("Error during processing", exchange, e);
                                    }
                                }

                                @Override
                                public void onClose(KubernetesClientException cause) {
                                    if (cause != null) {
                                        LOG.error(cause.getMessage(), cause);
                                    }

                                }
                            });
                } else {
                    getEndpoint().getKubernetesClient().pods().watch(new Watcher<Pod>() {

                        @Override
                        public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) {
                            PodEvent pe = new PodEvent(action, resource);
                            Exchange exchange = getEndpoint().createExchange();
                            exchange.getIn().setBody(pe.getPod());
                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction());
                            exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
                            try {
                                processor.process(exchange);
                            } catch (Exception e) {
                                getExceptionHandler().handleException("Error during processing", exchange, e);
                            }
                        }

                        @Override
                        public void onClose(KubernetesClientException cause) {
                            if (cause != null) {
                                LOG.error(cause.getMessage(), cause);
                            }
                        }
                    });
                }
            }
        }

After Change


        @Override
        public void run() {
            ClientMixedOperation<Pod, PodList, DoneablePod, ClientPodResource<Pod, DoneablePod>> w = getEndpoint().getKubernetesClient().pods();
            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespace())) {
                w.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespace());
            }
            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelKey()) 
                && ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getLabelValue())) {
                w.withLabel(getEndpoint().getKubernetesConfiguration().getLabelKey(), getEndpoint().getKubernetesConfiguration().getLabelValue());
            }
            if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getResourceName())) {
                w.withName(getEndpoint().getKubernetesConfiguration().getResourceName());
            }
            w.watch(new Watcher<Pod>() {

                @Override
                public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
                    Pod resource) {
                    PodEvent pe = new PodEvent(action, resource);
                    Exchange exchange = getEndpoint().createExchange();
                    exchange.getIn().setBody(pe.getPod());
                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction());
                    exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis());
                    try {
                        processor.process(exchange);
                    } catch (Exception e) {
                        getExceptionHandler().handleException("Error during processing", exchange, e);
                    }
                }

                @Override
                public void onClose(KubernetesClientException cause) {
                    if (cause != null) {
                        LOG.error(cause.getMessage(), cause);
                    }

                }
            });
        } 
    }
}